Skip to content

并发编程之 AQS(AbstractQueuedSynchronizer)框架

什么是 AQS?

AQS 的全称为(AbstractQueuedSynchronizer),这个类在 java.util.concurrent.locks 包下面。AQS 是一个用来构建锁和同步器的框架,比如 ReentrantLock,Semaphore,ReentrantReadWriteLock,SynchronousQueue,FutureTask 等等皆是基于 AQS 的。

AQS 是怎么实现的?

AQS 核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,将暂时获取不到锁的线程加入到队列中。

AQS 的实现有两种模式,独占模式(独占锁)和共享模式(共享锁)。独占模式是只有一个线程能够访问资源。而共享模式可以允许多个线程访问资源(例如:读写锁)。每次只要原子更新 state 的值从 0 变为 1 成功了就获取了锁,可重入是通过不断把 state 原子更新加 1 实现的。

  1. 锁状态变量 state: AQS 用一个变量(volatile int state) 属性来表示锁的状态,子类通过 CAS 原子操作去维护这个状态,通过内置的 FIFO 队列来完成获取资源线程的排队工作。
  2. AQS 队列: AQS 中维护了一个队列,获取锁失败(非 tryLock())的线程都将进入这个队列中排队,等待锁释放后唤醒下一个排队的线程(互斥锁模式下)。
  3. Condition 队列: AQS 中还有另一个非常重要的内部类 ConditionObject,它实现了 Condition 接口,主要用于实现条件锁。如 ReentrantLock 中的 Condition 和 signal() 方法。
  4. 模板设计模式: AQS 这个抽象类里面定义了一系列的模板方法,如果我们自己要写一个同步器时,只需要子类实现特定的几个方法,就可以完成一个同步器,示例如下。

自定义实现 AQS 框架(自己写一个类似于 ReentrantLock 的锁)

继承 AQS 实现其主要方法

java
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;

public class CustomSync extends AbstractQueuedSynchronizer {
    @Override
    public boolean tryAcquire(long acquires) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }


    @Override
    protected boolean tryRelease(long arg) {

        if(getState() == 0) {
            throw new IllegalMonitorStateException();
        }
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }

    @Override
    protected boolean isHeldExclusively() {
        return getState()==1;
    }


    public Condition newCondition() {
        return new ConditionObject();
    }
}

实现 Lock 接口实现加锁解锁

java
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.*;

@Slf4j(topic = "enjoy")
public class LockTest10 implements Lock {
    CustomSync customSync = new CustomSync();


    @Override
    public void lock() {
        customSync.acquire(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        customSync.acquireInterruptibly(1);
    }

    @Override
    public boolean tryLock() {
        return customSync.tryAcquire(1);
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return customSync.tryAcquireNanos(1, unit.toNanos(time));
    }

    @Override
    public void unlock() {
        customSync.release(1);
    }

    @Override
    public Condition newCondition() {
        return customSync.newCondition();
    }


    public static void main(String[] args) throws InterruptedException {
        LockTest10 l = new LockTest10();
        new Thread(()->{
            l.lock();
            log.debug("xxx");
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            l.unlock();
        },"t1").start();

        TimeUnit.SECONDS.sleep(1);
        l.lock();
        log.debug("main");
        l.unlock();
    }
}